package cn.xw.ProducerReleaseConfirmation;

import cn.xw.utils.ChannelUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.MessageProperties;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

/**
 * @author AnHui OuYang
 * @version 1.0
 * created at 2023-04-11 22:14
 * 生产者（生产任务）
 */
public class Producer {

    //通过日志管理器获取Logger对象
    static Logger logger = LogManager.getLogger(Producer.class);
    //单个发布确认
    public static final String SINGLE_RELEASE_CONFIRMATION = "singleReleaseConfirmation";
    //批量确认
    public static final String BATCH_CONFIRMATION = "batchConfirmation";
    //异步发布确认
    public static final String ASYNC_RELEASE_CONFIRMATION = "asyncReleaseConfirmation";

    /***
     * 调用 单个发布确认，批量发布确认，异步发布确认
     */
    public static void main(String[] args) throws IOException, InterruptedException {
        //单个发布确认调用
        //singleReleaseConfirmation();
        //批量发布确认
        //batchConfirmation();
        //异步发布确认
        asyncReleaseConfirmation();
    }

    /***
     * 单个发布确认   发送100条数据：生产者消息发送完成【单个发布确认，用时：2181】
     */
    public static void singleReleaseConfirmation() throws IOException, InterruptedException {
        //通过工具类获取一个信道
        Channel channel = ChannelUtil.getChannel();
        //声明一个队列
        channel.queueDeclare(SINGLE_RELEASE_CONFIRMATION, true, false, false, null);
        //开启发布确认功能
        channel.confirmSelect();
        //记录开始时间
        long start = System.currentTimeMillis();
        //发送的消息
        for (int i = 1; i <= 100; i++) {
            byte[] msg = ("这是一个编号为：" + i + " 的待处理的消息").getBytes(StandardCharsets.UTF_8);
            //依次发送每一条消息
            channel.basicPublish("", SINGLE_RELEASE_CONFIRMATION, MessageProperties.PERSISTENT_TEXT_PLAIN, msg);
            //验证是否发送成功（等待确认）
            //boolean waitTime = channel.waitForConfirms(3000); //发送三秒后没得到回复将断定未发送过去
            boolean waitSuccess = channel.waitForConfirms(); //将一直等待刚才发送的消息是否被RabbitMQ确认
            //判断
            if (waitSuccess) {
                logger.debug("发送成功了，已发送到RabbitMQ队列中，发送信息为：{}", i);
            } else {
                logger.debug("发送失败，未收到RabbitMQ确认信息，发送信息为：{}", i);
            }
        }
        long end = System.currentTimeMillis();
        logger.info("生产者消息发送完成【单个发布确认，用时：{}】", end - start);
    }

    /***
     * 批量发布确认  发送100条数据：生产者消息发送完成【批量发布确认，用时：869】
     */
    public static void batchConfirmation() throws IOException, InterruptedException {
        //通过工具类获取一个信道
        Channel channel = ChannelUtil.getChannel();
        //声明一个队列
        channel.queueDeclare(BATCH_CONFIRMATION, true, false, false, null);
        //开启发布确认功能
        channel.confirmSelect();
        //记录开始时间
        long start = System.currentTimeMillis();
        //定义每次批量处理多少消息进行确认
        int batchNumber = 10;
        //发送的消息
        for (int i = 1; i <= 100; i++) {
            byte[] msg = ("这是一个编号为：" + i + " 的待处理的消息").getBytes(StandardCharsets.UTF_8);
            //依次发送每一条消息
            channel.basicPublish("", BATCH_CONFIRMATION, MessageProperties.PERSISTENT_TEXT_PLAIN, msg);
            //验证是否发送成功，对10求余，代表每发送10条消息会批量看看是否成功（等待确认）
            if (i % batchNumber == 0) {
                boolean waitSuccess = channel.waitForConfirms();
                //判断
                if (waitSuccess) {
                    logger.info("发送成功了，已发送到RabbitMQ队列中，发送信息范围：{}", (i - (batchNumber - 1)) + " ~ " + i);
                } else {
                    logger.info("发送失败，未收到RabbitMQ确认信息，发送信息范围：{}", (i - (batchNumber - 1)) + " ~ " + i);
                }
            }
        }
        long end = System.currentTimeMillis();
        logger.info("生产者消息发送完成【批量发布确认，用时：{}】", end - start);
    }

    /***
     * 异步发布确认    发送100条数据：生产者消息发送完成【异步发布确认，用时：17】
     */
    public static void asyncReleaseConfirmation() throws IOException {
        //通过工具类获取一个信道
        Channel channel = ChannelUtil.getChannel();
        //声明一个队列
        channel.queueDeclare(ASYNC_RELEASE_CONFIRMATION, true, false, false, null);
        //开启发布确认功能
        channel.confirmSelect();
        //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Start
        //线程安全有序的一个哈希表Map，适用于高并发的情况
        //1.轻松的将序号与消息进行关联 2.轻松批量删除条目 只要给到序列号 3.支持并发访问
        ConcurrentSkipListMap<Object, Object> outstandingConfirms = new ConcurrentSkipListMap<>();

        //添加监听器，监听返回(监听器一定要再发送消息之前就创建和监听) 参数1：回调成功 参数2：回调失败
        channel.addConfirmListener((deliveryTag, multiple) -> {
            //这个是回调成功的，回调成功后把集合中的数据删除，最终就代表失败的多少
            if (multiple) {
                ConcurrentNavigableMap<Object, Object> navigableMap = outstandingConfirms.headMap(deliveryTag, true);
                navigableMap.clear();
            } else {
                outstandingConfirms.remove(deliveryTag);
            }
            logger.info("回调成功的数据：{}   是否批量确认：{}", deliveryTag, multiple);
        }, (deliveryTag, multiple) -> {
            logger.info("回调失败的数据：{}", deliveryTag);
        });
        //~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ End

        //记录开始时间
        long start = System.currentTimeMillis();
        //发送的消息
        for (int i = 1; i <= 100; i++) {
            byte[] msg = ("这是一个编号为：" + i + " 的待处理的消息").getBytes(StandardCharsets.UTF_8);
            //把要发送的数据记录到集合中
            outstandingConfirms.put(channel.getNextPublishSeqNo(), msg);
            //依次发送每一条消息
            channel.basicPublish("", ASYNC_RELEASE_CONFIRMATION, MessageProperties.PERSISTENT_TEXT_PLAIN, msg);
        }
        long end = System.currentTimeMillis();
        logger.info("生产者消息发送完成【异步发布确认，用时：{}】", end - start);
    }
}
